// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use restate_storage_api::invocation_status_table::InvocationStatus;
use restate_types::invocation::InvocationEpoch;
use restate_types::journal_v2::CompletionId;

// Few useful helpers for InvocationStatus, used by the state machine
pub(super) trait InvocationStatusExt {
    /// Returns true if this invocation should accept the entry with given source epoch and completion id.
    fn should_accept_completion(
        &self,
        source_invocation_epoch: InvocationEpoch,
        completion_id: CompletionId,
    ) -> bool;
}

impl InvocationStatusExt for InvocationStatus {
    fn should_accept_completion(
        &self,
        this_completion_invocation_epoch: InvocationEpoch,
        completion_id: CompletionId,
    ) -> bool {
        if let Some(im) = self.get_invocation_metadata() {
            // We should accept completions for commands we haven't trimmed.
            // The data structure completion_range_epoch_map tells us the completion ranges -> maximum invocation epoch mapping.
            //
            // When this_completion_invocation_epoch > maximum_epoch_for(completion_id),
            // it means this completion instance was generated by a command that is **after** the trim point,
            // thus it should be trimmed.
            //
            // See the unit tests of CompletionRangeEpochMap for a good explanation of the different cases.
            im.completion_range_epoch_map
                .maximum_epoch_for(completion_id)
                <= this_completion_invocation_epoch
        } else {
            // This is not an in-flight state, so all good to ignore.
            false
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use restate_storage_api::invocation_status_table::{
        CompletionRangeEpochMap, InFlightInvocationMetadata,
    };

    fn invocation_status_with(
        completion_range_epoch_map: CompletionRangeEpochMap,
    ) -> InvocationStatus {
        InvocationStatus::Invoked(InFlightInvocationMetadata {
            completion_range_epoch_map,
            ..InFlightInvocationMetadata::mock()
        })
    }

    #[test]
    fn default() {
        let is = invocation_status_with(CompletionRangeEpochMap::default());

        assert!(is.should_accept_completion(0, 0));
        assert!(is.should_accept_completion(0, 1));
        assert!(is.should_accept_completion(1, 1));
    }

    #[test]
    fn trim_at_2() {
        // ## Scenario
        // The journal looks as follows:
        // journal index 1 -> command 1 with completion id 1
        // journal index 2 -> command 2 with completion id 2

        // I trim with completion id 2 and bump invocation epoch from 0 to 1
        let mut completion_range_epoch_map = CompletionRangeEpochMap::default();
        completion_range_epoch_map.add_trim_point(2, 1);
        let is = invocation_status_with(completion_range_epoch_map);

        // If I get a completion id 2 for epoch 0, I discard it, because it belongs to the previous epoch.
        assert!(!is.should_accept_completion(0, 2));
        // In all the other cases, I accept it.
        assert!(is.should_accept_completion(0, 1));
        assert!(is.should_accept_completion(1, 1));
        assert!(is.should_accept_completion(1, 2));
    }
}
